# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import math
import pytest
import struct
import subprocess
from os.path import join
from subprocess import call

from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
from tests.common.test_dimensions import create_single_exec_option_dimension
from tests.common.test_vector import ImpalaTestDimension
from tests.util.filesystem_utils import get_fs_path

# (file extension, table suffix) pairs
compression_formats = [
  ('.bz2',     'bzip'),
  ('.deflate', 'def'),
  ('.gz',      'gzip'),
  ('.snappy',  'snap'),
 ]


# Missing Coverage: Compressed data written by Hive is queriable by Impala on a non-hdfs
# filesystem.
@SkipIfS3.hive
@SkipIfADLS.hive
@SkipIfIsilon.hive
@SkipIfLocal.hive
class TestCompressedFormats(ImpalaTestSuite):
  """
  Tests that we support compressed RC, sequence and text files and that unsupported
  formats fail gracefully (see IMPALA-14: Files with .gz extension reported as 'not
  supported').
  """
  @classmethod
  def get_workload(self):
    return 'functional-query'

  @classmethod
  def add_test_dimensions(cls):
    super(TestCompressedFormats, cls).add_test_dimensions()
    cls.ImpalaTestMatrix.clear()
    cls.ImpalaTestMatrix.add_dimension(\
        ImpalaTestDimension('file_format', *['rc', 'seq', 'text']))
    cls.ImpalaTestMatrix.add_dimension(\
        ImpalaTestDimension('compression_format', *compression_formats))
    if cls.exploration_strategy() == 'core':
      # Don't run on core.  This test is very slow and we are unlikely
      # to regress here.
      cls.ImpalaTestMatrix.add_constraint(lambda v: False);

  @pytest.mark.execute_serially
  def test_compressed_formats(self, vector):
    file_format = vector.get_value('file_format')
    extension, suffix = vector.get_value('compression_format')
    if file_format in ['rc', 'seq']:
      # Test that compressed RC/sequence files are supported.
      db_suffix = '_%s_%s' % (file_format, suffix)
      self._copy_and_query_compressed_file(
       'tinytable', db_suffix, suffix, '000000_0', extension)
    elif file_format is 'text':
      # TODO: How about LZO?
      if suffix in ['gzip', 'snap', 'bzip']:
        # Test that {gzip,snappy,bzip}-compressed text files are supported.
        db_suffix = '_%s_%s' % (file_format, suffix)
        self._copy_and_query_compressed_file(
          'tinytable', db_suffix, suffix, '000000_0', extension)
      else:
        # Deflate-compressed (['def']) text files (or at least text files with a
        # compressed extension) have not been tested yet.
        pytest.skip("Skipping the text/def tests")
    else:
      assert False, "Unknown file_format: %s" % file_format

  # TODO: switch to using hive metastore API rather than hive shell.
  def _copy_and_query_compressed_file(self, table_name, db_suffix, compression_codec,
                                     file_name, extension, expected_error=None):
    # We want to create a test table with a compressed file that has a file
    # extension. We'll do this by making a copy of an existing table with hive.
    base_dir = '/test-warehouse'
    src_table = 'functional%s.%s' % (db_suffix, table_name)
    src_table_dir = "%s%s" % (table_name, db_suffix)
    src_table_dir = join(base_dir, src_table_dir)
    src_file = join(src_table_dir, file_name)

    # Make sure destination table uses suffix, even if use_suffix=False, so
    # unique tables are created for each compression format
    dest_table = '%s_%s_copy' % (table_name, compression_codec)
    dest_table_dir = join(base_dir, dest_table)
    dest_file = join(dest_table_dir, file_name + extension)

    drop_cmd = 'DROP TABLE IF EXISTS %s;' % (dest_table)
    hive_cmd = drop_cmd + 'CREATE TABLE %s LIKE %s;' % (dest_table, src_table)

    # Create the table
    call(["hive", "-e", hive_cmd]);
    call(["hadoop", "fs", "-cp", src_file, dest_file])
    # Try to read the compressed file with extension
    query = 'select count(*) from %s' % dest_table
    try:
      # Need to invalidate the metadata because the table was created external to Impala.
      self.client.execute("invalidate metadata %s" % dest_table)
      result = self.execute_scalar(query)
      # Fail iff we expected an error
      assert expected_error is None, 'Query is expected to fail'
    except Exception as e:
      error_msg = str(e)
      print error_msg
      if expected_error is None or expected_error not in error_msg:
        print "Unexpected error:\n%s", error_msg
        raise
    finally:
      call(["hive", "-e", drop_cmd]);

class TestUnsupportedTableWriters(ImpalaTestSuite):
  @classmethod
  def get_workload(cls):
    return 'functional-query'

  @classmethod
  def add_test_dimensions(cls):
    super(TestUnsupportedTableWriters, cls).add_test_dimensions()
    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
    # This class tests different formats, but doesn't use constraints.
    # The constraint added below is only to make sure that the test file runs once.
    cls.ImpalaTestMatrix.add_constraint(lambda v:
        (v.get_value('table_format').file_format =='text' and
        v.get_value('table_format').compression_codec == 'none'))

  def test_error_message(self, vector, unique_database):
    # Tests that an appropriate error message is displayed for unsupported writers like
    # compressed text, avro and sequence.
    self.run_test_case('QueryTest/unsupported-writers', vector, unique_database)

@pytest.mark.execute_serially
class TestLargeCompressedFile(ImpalaTestSuite):
  """
  Tests that Impala handles compressed files in HDFS larger than 1GB.
  This test creates a 2GB test data file and loads it into a table.
  """
  TABLE_NAME = "large_compressed_file"
  TABLE_LOCATION = get_fs_path("/test-warehouse/large_compressed_file")
  """
  Name the file with ".snappy" extension to let scanner treat it as
  a snappy block compressed file.
  """
  FILE_NAME = "largefile.snappy"
  # Maximum uncompressed size of an outer block in a snappy block compressed file.
  CHUNK_SIZE = 1024 * 1024 * 1024
  # Limit the max file size to 2GB or too much memory may be needed when
  # uncompressing the buffer. 2GB is sufficient to show that we support
  # size beyond maximum 32-bit signed value.
  MAX_FILE_SIZE = 2 * CHUNK_SIZE

  @classmethod
  def get_workload(self):
    return 'functional-query'

  @classmethod
  def add_test_dimensions(cls):
    super(TestLargeCompressedFile, cls).add_test_dimensions()

    if cls.exploration_strategy() != 'exhaustive':
      pytest.skip("skipping if it's not exhaustive test.")
    cls.ImpalaTestMatrix.add_constraint(lambda v:
        (v.get_value('table_format').file_format =='text' and
        v.get_value('table_format').compression_codec == 'snap'))

  def teardown_method(self, method):
    self.__drop_test_table()

  def __generate_file(self, file_name, file_size):
    """Generate file with random data and a specified size."""

    # Read the payload compressed using snappy. The compressed payload
    # is generated from a string of 50176 bytes.
    payload_size = 50176
    hdfs_cat = subprocess.Popen(["hadoop", "fs", "-cat",
        "/test-warehouse/compressed_payload.snap"], stdout=subprocess.PIPE)
    compressed_payload = hdfs_cat.stdout.read()
    compressed_size = len(compressed_payload)
    hdfs_cat.stdout.close()
    hdfs_cat.wait()

    # The layout of a snappy-block compressed file is one or more
    # of the following nested structure which is called "chunk" in
    # the code below:
    #
    # - <big endian 32-bit value encoding the uncompresed size>
    # - one or more blocks of the following structure:
    #   - <big endian 32-bit value encoding the compressed size>
    #   - <raw bits compressed by snappy algorithm>

    # Number of nested structures described above.
    num_chunks = int(math.ceil(file_size / self.CHUNK_SIZE))
    # Number of compressed snappy blocks per chunk.
    num_blocks_per_chunk = self.CHUNK_SIZE / (compressed_size + 4)
    # Total uncompressed size of a nested structure.
    total_chunk_size = num_blocks_per_chunk * payload_size

    hdfs_put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name],
        stdin=subprocess.PIPE, bufsize=-1)
    for i in range(num_chunks):
      hdfs_put.stdin.write(struct.pack('>i', total_chunk_size))
      for j in range(num_blocks_per_chunk):
        hdfs_put.stdin.write(struct.pack('>i', compressed_size))
        hdfs_put.stdin.write(compressed_payload)
    hdfs_put.stdin.close()
    hdfs_put.wait()

  def test_query_large_file(self, vector):
    self.__create_test_table();
    dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME)
    file_size = self.MAX_FILE_SIZE
    self.__generate_file(dst_path, file_size)
    self.client.execute("refresh %s" % self.TABLE_NAME)

    # Query the table
    result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)

  def __create_test_table(self):
    self.__drop_test_table()
    self.client.execute("CREATE TABLE %s (col string) " \
        "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'"
        % (self.TABLE_NAME, self.TABLE_LOCATION))

  def __drop_test_table(self):
    self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME)

class TestBzip2Streaming(ImpalaTestSuite):
  MAX_SCAN_RANGE_LENGTHS = [0, 5]

  @classmethod
  def get_workload(cls):
    return 'functional-query'

  @classmethod
  def add_test_dimensions(cls):
    super(TestBzip2Streaming, cls).add_test_dimensions()

    if cls.exploration_strategy() != 'exhaustive':
      pytest.skip("skipping if it's not exhaustive test.")
    cls.ImpalaTestMatrix.add_dimension(
        ImpalaTestDimension('max_scan_range_length', *cls.MAX_SCAN_RANGE_LENGTHS))
    cls.ImpalaTestMatrix.add_constraint(lambda v:\
        v.get_value('table_format').file_format == 'text' and\
        v.get_value('table_format').compression_codec == 'bzip')

  def test_bzip2_streaming(self, vector):
    self.run_test_case('QueryTest/text-bzip-scan', vector)
